package com.katesoft.scale4j.rttp.subscription;

import static com.katesoft.scale4j.persistent.enums.CrudType.CREATE;
import static com.katesoft.scale4j.persistent.enums.CrudType.DELETE;
import static com.katesoft.scale4j.persistent.enums.CrudType.UPDATE;
import static com.katesoft.scale4j.rttp.internal.IRttpHazelcastRefs.SUBSCRIBER_EXECUTOR;

import java.io.Serializable;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;

import net.jcip.annotations.ThreadSafe;

import org.apache.commons.lang.builder.ReflectionToStringBuilder;
import org.hibernate.event.PostDeleteEvent;
import org.hibernate.event.PostDeleteEventListener;
import org.hibernate.event.PostInsertEvent;
import org.hibernate.event.PostInsertEventListener;
import org.hibernate.event.PostUpdateEvent;
import org.hibernate.event.PostUpdateEventListener;

import com.hazelcast.core.DistributedTask;
import com.hazelcast.core.ExecutionCallback;
import com.katesoft.scale4j.common.model.IPrettyPrintableObject;
import com.katesoft.scale4j.persistent.enums.CrudType;
import com.katesoft.scale4j.persistent.model.unified.AbstractPersistentEntity;
import com.katesoft.scale4j.rttp.internal.RttpHazelcastBridgeAwareImpl;

/**
 * This class allows to subscribe for insert, update, delete events of any domain entity you like.
 * 
 * @author kate2007
 */
@ThreadSafe
public class CloudSubscriber extends RttpHazelcastBridgeAwareImpl implements
         PostInsertEventListener, PostUpdateEventListener, PostDeleteEventListener, Serializable {
   private static final long serialVersionUID = -133563027585572128L;
   private Collection<ICloudSubscription<? super AbstractPersistentEntity>> cloudSubscriptions;

   public CloudSubscriber() {
      cloudSubscriptions = new LinkedHashSet<ICloudSubscription<? super AbstractPersistentEntity>>(
               0);
   }

   @Override
   public final void onPostInsert(PostInsertEvent event) {
      if (event.getEntity() instanceof AbstractPersistentEntity) {
         AbstractPersistentEntity obj = (AbstractPersistentEntity) event.getEntity();
         logger.debug("received db insert for entity[class=%s, uid=%s]", obj.getPersistentClass()
                  .getSimpleName(), obj.getUniqueIdentifier());
         submitEvent(CREATE, obj);
      }
   }

   @Override
   public final void onPostUpdate(PostUpdateEvent event) {
      if (event.getEntity() instanceof AbstractPersistentEntity) {
         AbstractPersistentEntity obj = (AbstractPersistentEntity) event.getEntity();
         logger.debug("received db update for entity[class=%s, uid=%s]", obj.getPersistentClass()
                  .getSimpleName(), obj.getUniqueIdentifier());
         submitEvent(UPDATE, obj);
      }
   }

   @Override
   public final void onPostDelete(PostDeleteEvent event) {
      if (event.getEntity() instanceof AbstractPersistentEntity) {
         AbstractPersistentEntity obj = (AbstractPersistentEntity) event.getEntity();
         logger.debug("received db delete for entity[class=%s, uid=%s]", obj.getPersistentClass()
                  .getSimpleName(), obj.getUniqueIdentifier());
         submitEvent(DELETE, obj);
      }
   }

   /**
    * this method will create dedicated distributed thread for each business process that evaluated
    * successfully by predicate and submit distributed task to hazelcast engine.
    * 
    * @param type
    *           crud event type
    * @param obj
    *           domain entity
    */
   protected void submitEvent(CrudType type, AbstractPersistentEntity obj) {
      for (ICloudSubscription<? super AbstractPersistentEntity> next : cloudSubscriptions) {
         IUnaryPredicate<? super AbstractPersistentEntity> predicate = next.getPredicate();
         Class<? super AbstractPersistentEntity> subscriptionClass = next.getSubscriptionClass();
         if (subscriptionClass.isAssignableFrom(obj.getClass())) {
            Boolean b = predicate.execute(obj);
            logger.debug("%s.execute(%s/%s) = %s", predicate, obj.getClass().getSimpleName(),
                     obj.getGlobalUniqueIdentifier(), b.toString());
            Collection<? extends IBusinessProcess<? super AbstractPersistentEntity>> processes = next
                     .getProcesses();
            logger.debug("invoking %s business processes for entity(%s/%s)", processes.size(), obj
                     .getPersistentClass().getSimpleName(), obj.getGlobalUniqueIdentifier());
            final CountDownLatch latch = new CountDownLatch(processes.size());
            for (IBusinessProcess<? super AbstractPersistentEntity> process : processes) {
               logger.debug("%s.process(%s/%s)", process, obj.getPersistentClass().getSimpleName(),
                        obj.getGlobalUniqueIdentifier());
               BusinessServiceProcess serviceProcess = new BusinessServiceProcess(process,
                        predicate, obj, type);
               DistributedTask<Void> task = new DistributedTask<Void>(serviceProcess, null);
               task.setExecutionCallback(new ExecutionCallbackO(latch));
               bridge.getHazelcastInstance().getExecutorService(SUBSCRIBER_EXECUTOR).execute(task);
            }
            try {
               latch.await();
               logger.debug("execution of %s finished", processes);
            } catch (InterruptedException e) {
               logger.error(e);
            }
         }
      }
   }

   static final class ExecutionCallbackO implements ExecutionCallback<Void> {
      private final CountDownLatch latch;

      private ExecutionCallbackO(CountDownLatch latch) {
         this.latch = latch;
      }

      @Override
      public void done(@SuppressWarnings("unused") Future<Void> future) {
         latch.countDown();
      }
   }

   static final class BusinessServiceProcess implements Runnable, Serializable,
            IPrettyPrintableObject {
      private static final long serialVersionUID = -3698911142450030197L;

      private final IBusinessProcess<? super AbstractPersistentEntity> process;
      private final IUnaryPredicate<? super AbstractPersistentEntity> predicate;
      private final AbstractPersistentEntity entity;
      private final CrudType crudType;

      private BusinessServiceProcess(IBusinessProcess<? super AbstractPersistentEntity> process,
               IUnaryPredicate<? super AbstractPersistentEntity> predicate,
               AbstractPersistentEntity entity, CrudType crudType) {
         this.process = process;
         this.predicate = predicate;
         this.entity = entity;
         this.crudType = crudType;
      }

      @SuppressWarnings({ "unchecked", "rawtypes" })
      @Override
      public void run() {
         process.process(new SubscriptionEvent(crudType, entity));
      }

      @Override
      public String toString() {
         return String.format("%s.%s", predicate.getClass().getSimpleName(), process.getClass()
                  .getSimpleName());
      }

      @Override
      public String reflectionToString() {
         return ReflectionToStringBuilder.reflectionToString(this);
      }
   }

   /**
    * inject cloud subscribers. this is required field.
    * 
    * @param cloudSubscriptions
    *           collection of client subscribers.
    */
   public void setCloudSubscriptions(
            Collection<ICloudSubscription<? super AbstractPersistentEntity>> cloudSubscriptions) {
      this.cloudSubscriptions = cloudSubscriptions;
   }

   public Collection<ICloudSubscription<? super AbstractPersistentEntity>> getCloudSubscriptions() {
      return cloudSubscriptions;
   }
}
